---
title: Workflows and Decorators
description: "Understanding the Workflow class and decorator-based tool definition in mcp-agent"
---

## Overview

mcp-agent provides two powerful ways to define agent logic:
1. **Workflow Class**: For complex, stateful agent workflows
2. **Tool Decorators**: For simple, stateless functions

Both approaches expose your agent logic as MCP tools that can be invoked by any MCP client.

## The Workflow Class

The `Workflow` class is the foundation for building complex agent behaviors. It provides:
- Type-safe input/output handling
- Automatic MCP tool registration
- Support for both asyncio and Temporal execution
- Built-in error handling and retries
- Workflow state management

### Basic Workflow Definition

```python
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow, WorkflowResult

app = MCPApp(name="my_agent")

@app.workflow
class MyWorkflow(Workflow[str]):
    """A simple workflow that processes text."""
    
    @app.workflow_run
    async def run(self, input_text: str) -> WorkflowResult[str]:
        # Your agent logic here
        processed = await self.process_text(input_text)
        return WorkflowResult(value=processed)
    
    async def process_text(self, text: str) -> str:
        # Helper method
        return text.upper()
```

### Generic Type Parameters

Workflows use Python generics to specify return types:

```python
# String output
class TextWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, prompt: str) -> WorkflowResult[str]:
        return WorkflowResult(value="response")

# Dictionary output
class DataWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, query: dict) -> WorkflowResult[dict]:
        return WorkflowResult(value={"result": "data"})

# Custom type output
from pydantic import BaseModel

class AnalysisResult(BaseModel):
    sentiment: str
    confidence: float
    entities: List[str]

class AnalysisWorkflow(Workflow[AnalysisResult]):
    @app.workflow_run
    async def run(self, text: str) -> WorkflowResult[AnalysisResult]:
        result = AnalysisResult(
            sentiment="positive",
            confidence=0.95,
            entities=["Company A", "Product B"]
        )
        return WorkflowResult(value=result)
```

### Workflow Properties

Every workflow has access to important properties:

```python
@app.workflow
class StatefulWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Unique workflow instance ID
        workflow_id = self.id
        
        # Unique run ID (for this execution)
        run_id = self.run_id
        
        # Access app context
        logger = app.context.logger
        logger.info(f"Running workflow {workflow_id}, run {run_id}")
        
        # Access configuration
        config = app.context.settings
        
        return WorkflowResult(value={"workflow_id": workflow_id})
```

### Error Handling

Workflows provide structured error handling:

```python
@app.workflow
class RobustWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        try:
            result = await self.risky_operation(input)
            return WorkflowResult(value=result)
        except ValidationError as e:
            # Return error in result
            return WorkflowResult(
                value=None,
                error=f"Validation failed: {e}",
                metadata={"error_type": "validation"}
            )
        except Exception as e:
            # Log and re-raise for retry
            app.context.logger.error(f"Workflow failed: {e}")
            raise
```

## Tool Decorators

For simpler use cases, mcp-agent provides decorator-based tool definition:

### @app.tool - Synchronous Tools

The `@app.tool` decorator creates tools that return results immediately:

```python
from mcp_agent.app import MCPApp
from typing import Optional

app = MCPApp(name="utility_agent")

@app.tool
async def calculate_sum(numbers: List[float]) -> float:
    """Calculate the sum of a list of numbers."""
    return sum(numbers)

@app.tool(name="get-weather")
async def get_weather(
    city: str, 
    units: str = "celsius",
    app_ctx: Optional[Context] = None
) -> dict:
    """
    Get weather for a city.
    
    Args:
        city: City name
        units: Temperature units (celsius or fahrenheit)
    """
    # Access app context if needed
    if app_ctx:
        logger = app_ctx.logger
        logger.info(f"Getting weather for {city}")
    
    # Your logic here
    weather = await fetch_weather_api(city, units)
    return weather
```

Key features:
- Returns final result directly
- No workflow ID or polling needed
- Best for quick operations
- Supports optional `app_ctx` parameter for context access

### @app.async_tool - Asynchronous Tools

The `@app.async_tool` decorator creates tools that start workflows asynchronously:

```python
@app.async_tool(name="analyze-document")
async def analyze_document_async(
    document_url: str,
    analysis_type: str = "summary",
    app_ctx: Optional[Context] = None
) -> dict:
    """
    Start document analysis asynchronously.
    
    Returns workflow_id and run_id for status polling.
    """
    # Start long-running analysis
    workflow = DocumentAnalysisWorkflow()
    handle = await app_ctx.executor.start_workflow(
        workflow,
        {"url": document_url, "type": analysis_type}
    )
    
    # Return IDs for polling
    return {
        "workflow_id": workflow.id,
        "run_id": handle.id,
        "message": "Analysis started. Use workflows-get_status to check progress."
    }
```

Key features:
- Returns workflow/run IDs immediately
- Client polls for results using `workflows-get_status`
- Best for long-running operations
- Enables progress tracking

### Tool Naming and Description

Control how your tools appear to MCP clients:

```python
@app.tool(
    name="search-knowledge-base",
    description="Search the knowledge base for relevant information"
)
async def search(
    query: str,
    limit: int = 10,
    filters: Optional[dict] = None
) -> List[dict]:
    """
    Detailed search implementation.
    
    Args:
        query: Search query
        limit: Maximum results
        filters: Optional filters
    """
    # The description parameter becomes the tool description
    # The docstring provides implementation details
    return await perform_search(query, limit, filters)
```

## Advanced Workflow Patterns

### Workflow Composition

Compose complex workflows from simpler ones:

```python
@app.workflow
class CompositeWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, request: dict) -> WorkflowResult[dict]:
        # Run sub-workflows
        step1 = DataFetchWorkflow()
        data = await step1.run(request["source"])
        
        step2 = DataProcessWorkflow()
        processed = await step2.run(data.value)
        
        step3 = ReportGenerationWorkflow()
        report = await step3.run(processed.value)
        
        return WorkflowResult(value={
            "data": data.value,
            "processed": processed.value,
            "report": report.value
        })
```

### Workflow with Agents

Integrate agents into workflows:

```python
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

@app.workflow
class AgentWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # Create specialized agent
        agent = Agent(
            name="researcher",
            instruction="Research thoroughly and provide detailed analysis.",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            # Attach LLM
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Execute task
            result = await llm.generate_str(task)
            
            return WorkflowResult(value=result)
```

### Parallel Workflow Execution

Execute multiple workflows in parallel:

```python
import asyncio

@app.workflow
class ParallelWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, tasks: List[str]) -> WorkflowResult[dict]:
        # Create workflow instances
        workflows = [
            TaskWorkflow() for _ in tasks
        ]
        
        # Run in parallel
        results = await asyncio.gather(*[
            w.run(task) for w, task in zip(workflows, tasks)
        ])
        
        # Combine results
        combined = {
            f"task_{i}": r.value 
            for i, r in enumerate(results)
        }
        
        return WorkflowResult(value=combined)
```

### Stateful Workflows

Maintain state across workflow executions:

```python
@app.workflow
class StatefulWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.state = {}
    
    @app.workflow_run
    async def run(self, action: dict) -> WorkflowResult[dict]:
        action_type = action.get("type")
        
        if action_type == "set":
            self.state[action["key"]] = action["value"]
            return WorkflowResult(value={"status": "set"})
        
        elif action_type == "get":
            value = self.state.get(action["key"])
            return WorkflowResult(value={"value": value})
        
        elif action_type == "clear":
            self.state.clear()
            return WorkflowResult(value={"status": "cleared"})
        
        return WorkflowResult(value=self.state)
```

## Temporal Integration

Workflows seamlessly support Temporal for durable execution:

```python
# Configure for Temporal
app = MCPApp(
    name="temporal_agent",
    settings=Settings(
        execution_engine="temporal",
        temporal=TemporalSettings(
            host="localhost",
            port=7233,
            namespace="default",
            task_queue="mcp-agent"
        )
    )
)

@app.workflow
class DurableWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # This workflow is now durable
        # It can be paused, resumed, and retried
        
        # Wait for signal (human-in-the-loop)
        await app.context.executor.signal_bus.wait_for_signal(
            Signal(name="approve", workflow_id=self.id)
        )
        
        # Continue after approval
        result = await self.process_with_approval(task)
        return WorkflowResult(value=result)
```

## MCP Server Integration

### Exposing Workflows as MCP Tools

Workflows and tools are automatically exposed when creating an MCP server:

```python
from mcp_agent.mcp.server import create_mcp_server_for_app

# Define workflows and tools
@app.workflow
class MyWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        return WorkflowResult(value=f"Processed: {input}")

@app.tool
async def my_tool(param: str) -> str:
    return f"Tool result: {param}"

# Create MCP server
async def main():
    async with app.run():
        mcp_server = create_mcp_server_for_app(app)
        
        # Available tools:
        # - workflows-list
        # - workflows-MyWorkflow-run
        # - workflows-get_status
        # - my_tool
        
        await mcp_server.run_stdio_async()
```

### Tool Discovery

MCP clients can discover available tools:

```python
# From MCP client perspective
tools = await server.list_tools()
for tool in tools:
    print(f"Tool: {tool.name}")
    print(f"Description: {tool.description}")
    print(f"Parameters: {tool.input_schema}")
```

## Best Practices

<AccordionGroup>
  <Accordion title="Choose the Right Abstraction">
    - Use `@app.tool` for simple, stateless operations
    - Use `@app.async_tool` for long-running operations that need polling
    - Use `Workflow` class for complex, multi-step processes
  </Accordion>
  
  <Accordion title="Type Hints and Documentation">
    Always provide type hints and docstrings:
    ```python
    @app.tool
    async def process_data(
        data: dict,
        options: Optional[dict] = None
    ) -> dict:
        """
        Process data with optional transformations.
        
        Args:
            data: Input data to process
            options: Optional processing options
            
        Returns:
            Processed data dictionary
        """
        # Implementation
    ```
  </Accordion>
  
  <Accordion title="Error Handling">
    Handle errors gracefully:
    ```python
    @app.workflow
    class SafeWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            try:
                result = await self.process(input)
                return WorkflowResult(value=result)
            except Exception as e:
                logger.error(f"Processing failed: {e}")
                return WorkflowResult(
                    value=None,
                    error=str(e)
                )
    ```
  </Accordion>
  
  <Accordion title="Resource Management">
    Use context managers for resources:
    ```python
    @app.workflow
    class ResourceWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, query: str) -> WorkflowResult[str]:
            async with self.get_database() as db:
                result = await db.query(query)
                return WorkflowResult(value=result)
    ```
  </Accordion>
  
  <Accordion title="Logging and Observability">
    Use structured logging:
    ```python
    @app.tool
    async def monitored_tool(input: str, app_ctx: Optional[Context] = None) -> str:
        if app_ctx:
            logger = app_ctx.logger
            logger.info("Tool started", data={"input": input})
            
            try:
                result = await process(input)
                logger.info("Tool completed", data={"result_length": len(result)})
                return result
            except Exception as e:
                logger.error("Tool failed", data={"error": str(e)})
                raise
    ```
  </Accordion>
</AccordionGroup>

## Testing Workflows

Test your workflows locally:

```python
import asyncio
import pytest

@pytest.mark.asyncio
async def test_workflow():
    app = MCPApp(name="test_app")
    
    @app.workflow
    class TestWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            return WorkflowResult(value=input.upper())
    
    async with app.run():
        workflow = TestWorkflow()
        result = await workflow.run("hello")
        assert result.value == "HELLO"
```

## Migration Guide

### From Functions to Tools

```python
# Before: Plain function
async def calculate(x: int, y: int) -> int:
    return x + y

# After: MCP tool
@app.tool
async def calculate(x: int, y: int) -> int:
    """Calculate sum of two numbers."""
    return x + y
```

### From Scripts to Workflows

```python
# Before: Script
async def main():
    data = await fetch_data()
    processed = await process_data(data)
    await save_results(processed)

# After: Workflow
@app.workflow
class DataPipeline(Workflow[dict]):
    @app.workflow_run
    async def run(self, source: str) -> WorkflowResult[dict]:
        data = await self.fetch_data(source)
        processed = await self.process_data(data)
        await self.save_results(processed)
        return WorkflowResult(value=processed)
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Workflow Patterns" icon="diagram-project" href="/workflows/overview">
    Explore pre-built workflow patterns
  </Card>
  <Card title="Agent Server" icon="server" href="/cloud/agent-server">
    Deploy workflows as MCP servers
  </Card>
  <Card title="Temporal Integration" icon="clock" href="/advanced/temporal">
    Add durability with Temporal
  </Card>
  <Card title="Examples" icon="code" href="https://github.com/lastmile-ai/mcp-agent/tree/main/examples">
    See workflows in action
  </Card>
</CardGroup>